

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.types.Row


object Flink_MysqlSource {


  def main(args: Array[String]): Unit = {

    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    val inputMysql: DataSet[Row] = jdbcRead(env)

    inputMysql.print()

  }

  def jdbcRead(env: ExecutionEnvironment) ={
    val inputMysql: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
      //    .指定驱动名称
      .setDrivername("com.mysql.jdbc.Driver")
      //      url
      .setDBUrl("jdbc:mysql://Desktop:3306/employees?useUnicode=true&characterEncoding=utf-8&useSSL=false")
      .setUsername("appleyuchi")
      .setPassword("appleyuchi")
      .setQuery("select * from titles")
      .setRowTypeInfo(new RowTypeInfo(
        BasicTypeInfo.INT_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.DATE_TYPE_INFO,
        BasicTypeInfo.DATE_TYPE_INFO))
      .finish()
    )
    inputMysql //返回inputMysql变量
  }

}



